{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "import datetime\n",
    "import os, re, sys\n",
    "import pandas as pd\n",
    "from pyspark.sql import Row\n",
    "# from test_helper import Test"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "# Use Pandas to read sample and inspect\n",
    "logDataFile = \"../_datasets_downloads/NASA_access_log_Aug95.gz\"\n",
    "logfile = pd.read_table(logDataFile, header=None, encoding='utf-8')\n",
    "# test_log = logfile[0][0]\n",
    "# test_log\n",
    "logfile"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "APACHE_ACCESS_LOG_PATTERN = '^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(\\S+) (\\S+)\\s*(\\S*)\" (\\d{3}) (\\S+)'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1839\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt'"
      ]
     },
     "execution_count": 7,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We create a regular expression pattern to extract the nine fields of the log line using the Python regular expression search function. The function returns a pair consisting of a Row object and 1. If the log line fails to match the regular expression, the function returns a pair consisting of the log line string and 0. A '-' value in the content size field is cleaned up by substituting it with 0."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1839\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "'/shuttle/missions/sts-68/news/sts-68-mcc-05.txt'"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Search pattern and extract\n",
    "match = re.search(APACHE_ACCESS_LOG_PATTERN, test_log)\n",
    "print(match.group(9))\n",
    "\n",
    "match.group(6)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "datetime.datetime(1995, 8, 1, 0, 0, 1)"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#  Convert sample `logfile` entry to date time format\n",
    "\n",
    "month_map = {'Jan': 1, 'Feb': 2, 'Mar':3, 'Apr':4, 'May':5, 'Jun':6, 'Jul':7,\n",
    "    'Aug':8,  'Sep': 9, 'Oct':10, 'Nov': 11, 'Dec': 12}\n",
    "\n",
    "\n",
    "def parse_apache_time(s):\n",
    "    \"\"\" Convert date entry in log file to datetime format\"\"\"\n",
    "    return datetime.datetime(int(s[7:11]),\n",
    "                             month_map[s[3:6]],\n",
    "                             int(s[0:2]),\n",
    "                             int(s[12:14]),\n",
    "                             int(s[15:17]),\n",
    "                             int(s[18:20]))\n",
    "\n",
    "\n",
    "# Test\n",
    "parse_apache_time(match.groups()[3])\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "\n",
    "def parseApacheLogLine(logline):\n",
    "    \"\"\" Parse each line entry of the log file \"\"\"\n",
    "    match = re.search(APACHE_ACCESS_LOG_PATTERN, logline)\n",
    "    \n",
    "    # If no match is found, return entry and zero\n",
    "    if match is None:\n",
    "        return(logline, 0)\n",
    "    \n",
    "    # If field_size is empty, initialize to zero\n",
    "    field_size = match.group(9) \n",
    "    if field_size == '_':\n",
    "        size = long(0)\n",
    "    else:\n",
    "        size = long(match.group(9))\n",
    "    return (Row(\n",
    "            host = match.group(1),\n",
    "            client_identd = match.group(2),\n",
    "            user_id = match.group(3),\n",
    "            date_time = parse_apache_time(match.group(4)),\n",
    "            method = match.group(5),\n",
    "            endpoint = match.group(6),\n",
    "            protocol = match.group(7),\n",
    "            response_code = int(match.group(8)),\n",
    "            content_size = size\n",
    "            ), 1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "(Row(client_identd='-', content_size=1839L, date_time=datetime.datetime(1995, 8, 1, 0, 0, 1), endpoint='/shuttle/missions/sts-68/news/sts-68-mcc-05.txt', host='in24.inetnebr.com', method='GET', protocol='HTTP/1.0', response_code=200, user_id='-'),\n",
       " 1)"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "parseApacheLogLine(test_log)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Configuration and Initial RDD Creation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 56,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[u'199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] \"GET /history/apollo/ HTTP/1.0\" 200 6245',\n",
       " u'unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] \"GET /shuttle/countdown/ HTTP/1.0\" 200 3985']"
      ]
     },
     "execution_count": 56,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# We first load the text file using sc.textFile(filename) \n",
    "# to convert each line of the file into an element in an RDD.\n",
    "\n",
    "log_fileRDD = sc.textFile(\"../_datasets_downloads/NASA_access_log_Jul95\")\n",
    "log_fileRDD.take(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Test function `parseApacheLogLine` - takes `str` input"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "PythonRDD[6] at RDD at PythonRDD.scala:43"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Next, we use map(parseApacheLogLine) to apply the parse function to each element \n",
    "# (that is, a line from the log file) in the RDD and turn each line into a pair Row object.\n",
    "log_fileRDD = log_fileRDD.map(parseApacheLogLine)\n",
    "log_fileRDD.cache()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 64,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "ename": "Py4JJavaError",
     "evalue": "An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 50, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n  File \"/Users/RichardAfolabi/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 111, in main\n    process()\n  File \"/Users/RichardAfolabi/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 106, in process\n    serializer.dump_stream(func(split_index, iterator), outfile)\n  File \"/Users/RichardAfolabi/spark/python/lib/pyspark.zip/pyspark/serializers.py\", line 263, in dump_stream\n    vs = list(itertools.islice(iterator, batch))\n  File \"<ipython-input-11-bdcb8ab03ed4>\", line 15, in parseApacheLogLine\nValueError: invalid literal for long() with base 10: '-'\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)\n\tat org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)\n\tat org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:283)\n\tat org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)\n\tat org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:268)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:270)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:316)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:926)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:497)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n  File \"/Users/RichardAfolabi/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 111, in main\n    process()\n  File \"/Users/RichardAfolabi/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 106, in process\n    serializer.dump_stream(func(split_index, iterator), outfile)\n  File \"/Users/RichardAfolabi/spark/python/lib/pyspark.zip/pyspark/serializers.py\", line 263, in dump_stream\n    vs = list(itertools.islice(iterator, batch))\n  File \"<ipython-input-11-bdcb8ab03ed4>\", line 15, in parseApacheLogLine\nValueError: invalid literal for long() with base 10: '-'\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)\n\tat org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)\n\tat org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:283)\n\tat org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)\n\tat org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:268)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:270)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mPy4JJavaError\u001b[0m                             Traceback (most recent call last)",
      "\u001b[0;32m<ipython-input-64-d7ffeeb12707>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[1;32m     21\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     22\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 23\u001b[0;31m \u001b[0mparsed_logs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0maccess_logs\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfailed_logs\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mparseLogs\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
      "\u001b[0;32m<ipython-input-64-d7ffeeb12707>\u001b[0m in \u001b[0;36mparseLogs\u001b[0;34m()\u001b[0m\n\u001b[1;32m      7\u001b[0m     \u001b[0mfailed_logs\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m(\u001b[0m\u001b[0mparsed_logs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfilter\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0;36m0\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmap\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      8\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 9\u001b[0;31m     \u001b[0mfailed_logs_count\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mfailed_logs\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcount\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     10\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     11\u001b[0m     \u001b[0;32mif\u001b[0m \u001b[0mfailed_logs_count\u001b[0m \u001b[0;34m>\u001b[0m \u001b[0;36m0\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/Users/RichardAfolabi/spark/python/pyspark/rdd.pyc\u001b[0m in \u001b[0;36mcount\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m   1002\u001b[0m         \u001b[0;36m3\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m   1003\u001b[0m         \"\"\"\n\u001b[0;32m-> 1004\u001b[0;31m         \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mi\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m1\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0m_\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mi\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m   1005\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m   1006\u001b[0m     \u001b[0;32mdef\u001b[0m \u001b[0mstats\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/Users/RichardAfolabi/spark/python/pyspark/rdd.pyc\u001b[0m in \u001b[0;36msum\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m    993\u001b[0m         \u001b[0;36m6.0\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    994\u001b[0m         \"\"\"\n\u001b[0;32m--> 995\u001b[0;31m         \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;32mlambda\u001b[0m \u001b[0mx\u001b[0m\u001b[0;34m:\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0msum\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mx\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfold\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0moperator\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0madd\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    996\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    997\u001b[0m     \u001b[0;32mdef\u001b[0m \u001b[0mcount\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/Users/RichardAfolabi/spark/python/pyspark/rdd.pyc\u001b[0m in \u001b[0;36mfold\u001b[0;34m(self, zeroValue, op)\u001b[0m\n\u001b[1;32m    867\u001b[0m         \u001b[0;31m# zeroValue provided to each partition is unique from the one provided\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    868\u001b[0m         \u001b[0;31m# to the final reduce call\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 869\u001b[0;31m         \u001b[0mvals\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mmapPartitions\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfunc\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollect\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    870\u001b[0m         \u001b[0;32mreturn\u001b[0m \u001b[0mreduce\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mop\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mvals\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mzeroValue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    871\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/Users/RichardAfolabi/spark/python/pyspark/rdd.pyc\u001b[0m in \u001b[0;36mcollect\u001b[0;34m(self)\u001b[0m\n\u001b[1;32m    769\u001b[0m         \"\"\"\n\u001b[1;32m    770\u001b[0m         \u001b[0;32mwith\u001b[0m \u001b[0mSCCallSiteSync\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcontext\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0mcss\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 771\u001b[0;31m             \u001b[0mport\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mctx\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonRDD\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcollectAndServe\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mrdd\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    772\u001b[0m         \u001b[0;32mreturn\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0m_load_from_socket\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mport\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jrdd_deserializer\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    773\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/Users/RichardAfolabi/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m    811\u001b[0m         \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    812\u001b[0m         return_value = get_return_value(\n\u001b[0;32m--> 813\u001b[0;31m             answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m    814\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    815\u001b[0m         \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/Users/RichardAfolabi/spark/python/pyspark/sql/utils.pyc\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m     43\u001b[0m     \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     44\u001b[0m         \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 45\u001b[0;31m             \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     46\u001b[0m         \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     47\u001b[0m             \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/Users/RichardAfolabi/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m    306\u001b[0m                 raise Py4JJavaError(\n\u001b[1;32m    307\u001b[0m                     \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 308\u001b[0;31m                     format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m    309\u001b[0m             \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    310\u001b[0m                 raise Py4JError(\n",
      "\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 50, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n  File \"/Users/RichardAfolabi/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 111, in main\n    process()\n  File \"/Users/RichardAfolabi/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 106, in process\n    serializer.dump_stream(func(split_index, iterator), outfile)\n  File \"/Users/RichardAfolabi/spark/python/lib/pyspark.zip/pyspark/serializers.py\", line 263, in dump_stream\n    vs = list(itertools.islice(iterator, batch))\n  File \"<ipython-input-11-bdcb8ab03ed4>\", line 15, in parseApacheLogLine\nValueError: invalid literal for long() with base 10: '-'\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)\n\tat org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)\n\tat org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:283)\n\tat org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)\n\tat org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:268)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:270)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)\n\tat scala.Option.foreach(Option.scala:236)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)\n\tat org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)\n\tat org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)\n\tat org.apache.spark.rdd.RDD.withScope(RDD.scala:316)\n\tat org.apache.spark.rdd.RDD.collect(RDD.scala:926)\n\tat org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)\n\tat org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:497)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)\n\tat py4j.Gateway.invoke(Gateway.java:259)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):\n  File \"/Users/RichardAfolabi/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 111, in main\n    process()\n  File \"/Users/RichardAfolabi/spark/python/lib/pyspark.zip/pyspark/worker.py\", line 106, in process\n    serializer.dump_stream(func(split_index, iterator), outfile)\n  File \"/Users/RichardAfolabi/spark/python/lib/pyspark.zip/pyspark/serializers.py\", line 263, in dump_stream\n    vs = list(itertools.islice(iterator, batch))\n  File \"<ipython-input-11-bdcb8ab03ed4>\", line 15, in parseApacheLogLine\nValueError: invalid literal for long() with base 10: '-'\n\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:129)\n\tat org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:125)\n\tat org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)\n\tat org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:283)\n\tat org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)\n\tat org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:268)\n\tat org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:270)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:89)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n"
     ]
    }
   ],
   "source": [
    "def parseLogs():\n",
    "    \"\"\" Read and parse log file. \"\"\"\n",
    "    parsed_logs = (sc.textFile(logDataFile).map(parseApacheLogLine).cache())\n",
    "    \n",
    "    access_logs = (parsed_logs.filter(lambda s: s[1] == 1).map(lambda s: s[0]).cache())\n",
    "    \n",
    "    failed_logs = (parsed_logs.filter(lambda s: s[1] == 0).map(lambda s: s[0]))\n",
    "    \n",
    "    failed_logs_count = failed_logs.count()\n",
    "    \n",
    "    if failed_logs_count > 0:\n",
    "        print(\"Number of invalud logline\" % failed_logs_count())\n",
    "        for line in failed_logs.take(20):\n",
    "            print(\"Invalid login: %s\" %line)\n",
    "    \n",
    "    print(\"Lines red : %d, \\n Parsed successfully: %d \\n Failed parse: %d\"% (parsed_logs.count(),\n",
    "                                                                            access_logs.count(),\n",
    "                                                                            failed_logs.count()))\n",
    "    return parsed_logs, access_logs, failed_logs\n",
    "\n",
    "\n",
    "\n",
    "parsed_logs, access_logs, failed_logs = parseLogs()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.11"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
